/*
 * ============================================================================
 *
 *       Filename:  thread_queue.h
 *
 *    Description:  Multiple Thread FIFO Queue
 *
 *        Version:  1.0
 *        Created:  06/21/2016 02:15:29 PM
 *
 *         Author:  Dou Yu, douyu@ncic.ac.cn
 *   Organization:  NCIC
 *
 * ============================================================================
 */

#ifndef __COMMON_THREAD_QUEUE_H_
#define __COMMON_THREAD_QUEUE_H_
#include<pthread.h>

typedef struct thread_queue thread_queue_t;
struct thread_queue {
	void ** buf;
	int size;
	int head, tail, count;
	int is_clean;
	pthread_mutex_t *lock;
	pthread_cond_t *full, *empty;
};

static inline thread_queue_t * thread_queue_init(int size) {
	thread_queue_t* queue;
	queue = (thread_queue_t*) malloc(sizeof(thread_queue_t));
	if (queue == NULL)
		return (NULL);
	queue->buf = (void **) malloc(sizeof(void *) * size);
	if (queue->buf == NULL)
		return (NULL);
	queue->size = size;
	queue->head = 0;
	queue->tail = 0;
	queue->count = 0;
	queue->is_clean = 0;
	queue->lock = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t));
	pthread_mutex_init(queue->lock, NULL);
	queue->full = (pthread_cond_t *) malloc(sizeof(pthread_cond_t));
	pthread_cond_init(queue->full, NULL);
	queue->empty = (pthread_cond_t *) malloc(sizeof(pthread_cond_t));
	pthread_cond_init(queue->empty, NULL);
	return queue;
}

static inline void thread_queue_destroy(thread_queue_t* queue) {
	pthread_mutex_destroy(queue->lock);
	free(queue->lock);
	pthread_cond_destroy(queue->full);
	free(queue->full);
	pthread_cond_destroy(queue->empty);
	free(queue->empty);
	free(queue->buf);
	free(queue);
}

static inline void thread_queue_offer(thread_queue_t* queue, void *item) {
	pthread_mutex_lock(queue->lock);
	while (queue->count == queue->size) {
		pthread_cond_wait(queue->full, queue->lock);
		if (queue->is_clean) {
			pthread_mutex_unlock(queue->lock);
			return;
		}
	}
	queue->buf[queue->tail] = item;
	if (++queue->tail == queue->size)
		queue->tail = 0;
	queue->count++;
	pthread_mutex_unlock(queue->lock);
	pthread_cond_broadcast(queue->empty);
}

static inline void * thread_queue_poll(thread_queue_t* queue) {
	void* res;
	pthread_mutex_lock(queue->lock);
	while (queue->count == 0) {
		if (queue->is_clean) {
			pthread_mutex_unlock(queue->lock);
			return NULL;
		}
		pthread_cond_wait(queue->empty, queue->lock);
	}
	res = queue->buf[queue->head];
	if (++queue->head == queue->size)
		queue->head = 0;
	queue->count--;
	pthread_mutex_unlock(queue->lock);
	pthread_cond_broadcast(queue->full);
	return res;
}

static inline void * thread_queue_trypoll(thread_queue_t* queue) {
	void* res;
	pthread_mutex_lock(queue->lock);
	while (queue->count == 0) {
		pthread_mutex_unlock(queue->lock);
		return NULL;
	}
	res = queue->buf[queue->head];
	if (++queue->head == queue->size)
		queue->head = 0;
	queue->count--;
	pthread_mutex_unlock(queue->lock);
	pthread_cond_broadcast(queue->full);
	return res;
}

static inline void thread_queue_clean(thread_queue_t* queue) {
	queue->is_clean = 1;
	//must get lock
	pthread_mutex_lock(queue->lock);
	pthread_cond_broadcast(queue->empty);
	pthread_cond_broadcast(queue->full);
	pthread_mutex_unlock(queue->lock);
}

#endif

/*end of file*/

